package com.hanxiaozhang.curator.source;

import com.google.common.collect.Maps;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.*;
import org.apache.curator.utils.PathUtils;

import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 〈一句话功能简述〉<br>
 * 〈curator中InterProcessMutex源码的理解〉
 *
 * @author hanxinghua
 * @create 2022/8/31
 * @since 1.0.0
 */
public class MyInterProcessMutex implements InterProcessLock, Revocable<MyInterProcessMutex> {

    /**
     * 锁内部实现
     */
    private final MyLockInternals internals;

    /**
     * 基础路径
     */
    private final String basePath;

    /**
     * 线程数据
     */
    private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();

    /**
     * 锁数据实体
     */
    private static class LockData {
        /**
         * 拥有线程
         */
        final Thread owningThread;
        /**
         * 锁路径
         */
        final String lockPath;
        /**
         * 锁计数
         */
        final AtomicInteger lockCount = new AtomicInteger(1);

        private LockData(Thread owningThread, String lockPath) {
            this.owningThread = owningThread;
            this.lockPath = lockPath;
        }
    }

    /**
     * 默认锁名称
     */
    private static final String LOCK_NAME = "lock-";

    /**
     * 构造方法
     *
     * @param client 客户端
     * @param path   抢锁路径
     */
    public MyInterProcessMutex(CuratorFramework client, String path) {
        this(client, path, new StandardLockInternalsDriver());
    }

    /**
     * 构造方法
     *
     * @param client 客户端
     * @param path   抢锁路径
     * @param driver 自定义lock驱动实现分布式锁
     */
    public MyInterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver) {
        this(client, path, LOCK_NAME, 1, driver);
    }

    /**
     * 构造函数，只包内可见
     *
     * @param client
     * @param path
     * @param lockName
     * @param maxLeases
     * @param driver
     */
    MyInterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver) {
        basePath = PathUtils.validatePath(path);
        internals = new MyLockInternals(client, driver, path, lockName, maxLeases);
    }

    /**
     * 获取互斥锁--阻塞直到它可用
     * 注意：
     * 同一个线程可以重入调用acquire
     * 调用acquire()方法后，需相应调用release()来释放锁
     *
     * @throws Exception zk错误、连接中断错误
     */
    @Override
    public void acquire() throws Exception {
        if (!internalLock(-1, null)) {
            throw new IOException("Lost connection while trying to acquire lock: " + basePath);
        }
    }

    /**
     * 获取互斥锁--阻塞直到它可用 或给定时间到期
     * 注意：
     * 同一个线程可以重入调用acquire
     * 调用acquire()方法后，需相应调用release()来释放锁
     *
     * @param time time to wait
     * @param unit time unit
     * @return 如果获取了互斥锁则为true，否则为false
     * @throws Exception ZK errors, connection interruptions
     */
    @Override
    public boolean acquire(long time, TimeUnit unit) throws Exception {
        return internalLock(time, unit);
    }

    /**
     * 如果互斥锁被当前JVM中的线程获取，则返回true
     * 即，当前线程持有互斥锁
     *
     * @return true/false
     */
    @Override
    public boolean isAcquiredInThisProcess() {
        return (threadData.size() > 0);
    }

    /**
     * Perform one release of the mutex if the calling thread is the same thread that acquired it.
     * If the thread had made multiple calls to acquire,the mutex will still be held when this method returns.
     * 如果调用的线程与调用acquired()方法的是同一个线程，执行一次互斥锁的释放
     * 如果线程已经多次调用acquire()方法，当这个方法返回时，互斥锁仍然会被持有
     *
     * @throws Exception ZK errors, interruptions, current thread does not own the lock
     */
    @Override
    public void release() throws Exception {
        /*
            Note on concurrency: a given lockData instance
            can be only acted on by a single thread so locking isn't necessary
         */

        Thread currentThread = Thread.currentThread();
        LockData lockData = threadData.get(currentThread);
        if (lockData == null) {
            throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
        }

        int newLockCount = lockData.lockCount.decrementAndGet();
        if (newLockCount > 0) {
            return;
        }
        if (newLockCount < 0) {
            throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
        }
        try {
            internals.releaseLock(lockData.lockPath);
        } finally {
            threadData.remove(currentThread);
        }
    }

    /**
     * 返回当前所有参与锁的节点的排序列表
     *
     * @return list of nodes
     * @throws Exception ZK errors, interruptions, etc.
     */
    public Collection<String> getParticipantNodes() throws Exception {
        return LockInternals.getParticipantNodes(internals.getClient(), basePath, internals.getLockName(), internals.getDriver());
    }

    /**
     * 使锁可撤销
     *
     * @param listener
     */
    @Override
    public void makeRevocable(RevocationListener<MyInterProcessMutex> listener) {
        makeRevocable(listener, MoreExecutors.sameThreadExecutor());
    }

    /**
     * 使锁可撤销
     *
     * @param listener
     * @param executor
     */
    @Override
    public void makeRevocable(final RevocationListener<MyInterProcessMutex> listener, Executor executor) {
        internals.makeRevocable(new MyRevocationSpec(executor, new Runnable() {
            @Override
            public void run() {
                listener.revocationRequested(MyInterProcessMutex.this);
            }
        }));
    }


    /**
     * 由当前线程拥有锁
     *
     * @return
     */
    boolean isOwnedByCurrentThread() {
        LockData lockData = threadData.get(Thread.currentThread());
        return (lockData != null) && (lockData.lockCount.get() > 0);
    }

    /**
     * 获取锁节点的数据
     *
     * @return
     */
    protected byte[] getLockNodeBytes() {
        return null;
    }

    /**
     * 获取锁路径
     *
     * @return
     */
    protected String getLockPath() {
        LockData lockData = threadData.get(Thread.currentThread());
        return lockData != null ? lockData.lockPath : null;
    }

    /**
     * 内部锁
     *
     * @param time
     * @param unit
     * @return
     * @throws Exception
     */
    private boolean internalLock(long time, TimeUnit unit) throws Exception {
        /*
           Note on concurrency: a given lockData instance
           can be only acted on by a single thread so locking isn't necessary
           并发注意事项：
           lockData实例只能由单个线程执行，因此不需要锁定。
           解释： todo 解释对吗 2022-09-01
           lockData是InterProcessMutex类中的私有静态内部类，new InterProcessMutex()的对象是由一个线程处理，
           所以，lockData实例只能由单个线程执行。
        */

        Thread currentThread = Thread.currentThread();

        // 实现锁重入
        LockData lockData = threadData.get(currentThread);
        if (lockData != null) {
            lockData.lockCount.incrementAndGet();
            return true;
        }

        // 调用LockInternals.attemptLock()进行加锁
        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
        // 加锁成功，则将当前线程和加锁数据加到map中
        if (lockPath != null) {
            LockData newLockData = new LockData(currentThread, lockPath);
            threadData.put(currentThread, newLockData);
            return true;
        }

        return false;
    }

}
